Deploying Retrieval Agents in Production

Durable execution, crash recovery, scaling agent workers, streaming intermediate results, and cost monitoring

Published

August 6, 2025

Keywords: durable execution, crash recovery, agent workers, streaming, cost monitoring, token budget, LangGraph Platform, Temporal, OpenTelemetry, production deployment, task queue, checkpointing, agent observability, scaling agents

Introduction

Building a retrieval agent that works in a notebook is one thing. Running it in production — where requests spike unpredictably, processes crash mid-execution, users expect real-time feedback, and every LLM call costs money — is an entirely different challenge. The gap between “demo agent” and “production agent” is not about smarter prompts or better tools. It is about infrastructure: durable execution, crash recovery, horizontal scaling, intermediate-result streaming, and cost controls.

A retrieval agent in production faces problems that never appear in development:

  • A worker process crashes after the third tool call in a five-step plan — does the user get an error, or does the agent resume from where it left off?
  • A hundred users submit deep-research queries simultaneously — does the system queue and distribute work, or does it fall over?
  • A query takes 45 seconds across six LLM calls — does the user stare at a spinner, or see each reasoning step as it happens?
  • A runaway agent loops 20 times on the same tool — who notices, and who stops it before the bill arrives?

This article covers the infrastructure patterns that answer these questions. We walk through durable execution with Temporal and LangGraph Platform, crash recovery via checkpointing and event sourcing, scaling with task queues and worker pools, streaming intermediate agent steps to users, and cost monitoring with OpenTelemetry and token budgets. Every pattern includes Python code you can adapt to your own stack.

Why Production Deployment Is Hard

Retrieval agents differ from traditional web services in ways that break standard deployment assumptions:

graph LR
    subgraph Traditional["Traditional API"]
        A1["Request"] --> A2["Process<br/>~100ms"] --> A3["Response"]
    end

    subgraph Agent["Retrieval Agent"]
        B1["Query"] --> B2["LLM Call 1<br/>~2s"]
        B2 --> B3["Tool Call<br/>~1s"]
        B3 --> B4["LLM Call 2<br/>~2s"]
        B4 --> B5["Tool Call<br/>~3s"]
        B5 --> B6["LLM Call 3<br/>~2s"]
        B6 --> B7["Response"]
    end

    style Traditional fill:#F2F2F2,stroke:#D9D9D9
    style Agent fill:#F2F2F2,stroke:#D9D9D9
    style A3 fill:#27ae60,color:#fff,stroke:#333
    style B7 fill:#27ae60,color:#fff,stroke:#333

Characteristic Traditional API Retrieval Agent
Latency 50–200 ms 10–120 seconds
State Stateless (single request) Stateful (multi-step conversation)
External calls 0–2 per request 3–20 per request (LLM + tools)
Cost per request Sub-cent (compute only) $0.01–$1.00+ (LLM tokens)
Failure modes Timeout, 5xx Mid-execution crash, tool error, token budget exhaustion, infinite loop
Output Complete response Incremental (thought → action → observation → …)

These differences mean you cannot just wrap an agent in a Flask endpoint behind a load balancer. You need purpose-built infrastructure for long-running, stateful, multi-step workloads.

Durable Execution: The Core Abstraction

Durable execution ensures that a long-running process can survive infrastructure failures — process crashes, deployments, machine restarts — and resume from exactly where it left off. For retrieval agents, this means a five-step reasoning chain does not restart from scratch when the worker handling it goes down.

How Durable Execution Works

sequenceDiagram
    participant U as User
    participant S as Orchestrator
    participant W1 as Worker A
    participant W2 as Worker B
    participant DB as State Store

    U->>S: Submit query
    S->>W1: Assign task
    W1->>DB: Save state (step 1)
    W1->>DB: Save state (step 2)
    Note over W1: Worker crashes!
    S-->>W2: Reassign task
    W2->>DB: Load state (step 2)
    W2->>DB: Save state (step 3)
    W2->>U: Return final answer

The pattern has three components:

  1. Orchestrator — Assigns tasks to workers and detects failures via heartbeats or timeouts
  2. State store — Persists execution state (messages, tool results, intermediate outputs) after each step
  3. Workers — Stateless processes that pick up tasks, load state, execute the next step, and save results

If a worker crashes, the orchestrator reassigns the task to another worker, which loads the last saved state and continues.

The Checkpoint Pattern

The simplest durable execution pattern for agents is step-level checkpointing — save the full agent state after every LLM call or tool execution:

import json
import hashlib
from dataclasses import dataclass, field, asdict
from typing import Any


@dataclass
class AgentCheckpoint:
    """Serializable snapshot of agent execution state."""
    run_id: str
    step: int
    messages: list[dict[str, str]]
    tool_results: list[dict[str, Any]]
    total_tokens: int = 0
    total_cost_usd: float = 0.0
    status: str = "running"  # running | completed | failed

    def serialize(self) -> str:
        return json.dumps(asdict(self))

    @classmethod
    def deserialize(cls, data: str) -> "AgentCheckpoint":
        return cls(**json.loads(data))


class CheckpointStore:
    """Persist and retrieve agent checkpoints."""

    def __init__(self, backend: str = "redis"):
        if backend == "redis":
            import redis
            self._client = redis.Redis(decode_responses=True)
        self._backend = backend

    def save(self, checkpoint: AgentCheckpoint) -> None:
        key = f"agent:checkpoint:{checkpoint.run_id}"
        self._client.set(key, checkpoint.serialize())
        self._client.expire(key, 86400)  # 24h TTL

    def load(self, run_id: str) -> AgentCheckpoint | None:
        key = f"agent:checkpoint:{run_id}"
        data = self._client.get(key)
        if data:
            return AgentCheckpoint.deserialize(data)
        return None

    def delete(self, run_id: str) -> None:
        self._client.delete(f"agent:checkpoint:{run_id}")

Integrating Checkpoints into the Agent Loop

from openai import OpenAI
import uuid

client = OpenAI()
store = CheckpointStore()


def run_durable_agent(
    query: str,
    run_id: str | None = None,
    model: str = "gpt-4o-mini",
    max_steps: int = 10,
) -> str:
    """Agent loop with step-level checkpointing for crash recovery."""
    # Resume from checkpoint or start fresh
    if run_id:
        checkpoint = store.load(run_id)
    else:
        run_id = str(uuid.uuid4())
        checkpoint = None

    if checkpoint and checkpoint.status == "completed":
        return checkpoint.messages[-1]["content"]

    if checkpoint is None:
        checkpoint = AgentCheckpoint(
            run_id=run_id,
            step=0,
            messages=[
                {"role": "system", "content": "You are a helpful retrieval agent."},
                {"role": "user", "content": query},
            ],
            tool_results=[],
        )
        store.save(checkpoint)

    for step in range(checkpoint.step, max_steps):
        # Call LLM
        response = client.chat.completions.create(
            model=model,
            messages=checkpoint.messages,
            tools=TOOL_SCHEMAS,
            tool_choice="auto",
            temperature=0,
        )

        msg = response.choices[0].message
        checkpoint.messages.append(msg.model_dump())
        checkpoint.total_tokens += response.usage.total_tokens
        checkpoint.step = step + 1

        # No tool calls → final answer
        if not msg.tool_calls:
            checkpoint.status = "completed"
            store.save(checkpoint)
            return msg.content

        # Execute tool calls
        for tool_call in msg.tool_calls:
            result = execute_tool(tool_call)
            checkpoint.messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": str(result),
            })
            checkpoint.tool_results.append({
                "step": step,
                "tool": tool_call.function.name,
                "result": str(result)[:500],
            })

        # Checkpoint after every step
        store.save(checkpoint)

    checkpoint.status = "failed"
    store.save(checkpoint)
    return "Agent reached maximum steps."

If the process crashes at any point, calling run_durable_agent(query, run_id=same_id) will load the last checkpoint and resume from the exact step where execution stopped — without re-executing completed LLM calls or tool invocations.

Crash Recovery Patterns

There are three main strategies for recovering from crashes during agent execution, each with different trade-offs:

Strategy 1: Checkpoint-Resume

Save state after every step. On crash, load the last checkpoint and continue.

# Already shown above — the simplest and most common pattern
# Pro: Simple, no replay
# Con: Requires serializable state

Strategy 2: Event Sourcing

Store every event (LLM call, tool result) as an immutable log. On recovery, replay the full event log to reconstruct state.

@dataclass
class AgentEvent:
    run_id: str
    sequence: int
    event_type: str  # "llm_call" | "tool_result" | "final_answer"
    payload: dict
    timestamp: float


class EventStore:
    """Append-only event log for agent execution."""

    def __init__(self):
        import redis
        self._client = redis.Redis(decode_responses=True)

    def append(self, event: AgentEvent) -> None:
        key = f"agent:events:{event.run_id}"
        self._client.rpush(key, json.dumps(asdict(event)))

    def replay(self, run_id: str) -> list[AgentEvent]:
        key = f"agent:events:{run_id}"
        raw_events = self._client.lrange(key, 0, -1)
        return [
            AgentEvent(**json.loads(e)) for e in raw_events
        ]


def recover_from_events(run_id: str) -> AgentCheckpoint:
    """Rebuild agent state by replaying the event log."""
    store = EventStore()
    events = store.replay(run_id)

    messages = [{"role": "system", "content": "You are a helpful retrieval agent."}]
    total_tokens = 0

    for event in events:
        if event.event_type == "user_query":
            messages.append({"role": "user", "content": event.payload["query"]})
        elif event.event_type == "llm_call":
            messages.append(event.payload["message"])
            total_tokens += event.payload.get("tokens", 0)
        elif event.event_type == "tool_result":
            messages.append({
                "role": "tool",
                "tool_call_id": event.payload["tool_call_id"],
                "content": event.payload["result"],
            })

    return AgentCheckpoint(
        run_id=run_id,
        step=len(events),
        messages=messages,
        tool_results=[],
        total_tokens=total_tokens,
    )

Strategy 3: Idempotent Replay (Temporal-Style)

Temporal’s approach: the workflow function itself is deterministic. On crash, Temporal replays the workflow from the beginning, but cached results from previous activity executions are returned instantly instead of re-executing.

Comparison

Strategy Complexity Recovery speed Storage Best for
Checkpoint-Resume Low Instant Last state only Simple agents, <10 steps
Event Sourcing Medium O(n) replay Full history Auditing, debugging
Idempotent Replay High O(n) replay (cached) Event log + results Long workflows, Temporal

Scaling Agent Workers

A single agent worker can handle one request at a time (due to synchronous LLM calls). Scaling requires distributing work across a pool of workers via a task queue.

Architecture

graph TD
    subgraph API["API Layer"]
        A1["Load Balancer"]
    end

    subgraph Queue["Task Queue"]
        Q1["Redis / RabbitMQ / SQS"]
    end

    subgraph Workers["Agent Worker Pool"]
        W1["Worker 1"]
        W2["Worker 2"]
        W3["Worker 3"]
        W4["Worker N"]
    end

    subgraph Storage["Shared State"]
        S1["Postgres / Redis"]
    end

    A1 --> Q1
    Q1 --> W1
    Q1 --> W2
    Q1 --> W3
    Q1 --> W4
    W1 --> S1
    W2 --> S1
    W3 --> S1
    W4 --> S1

    style API fill:#F2F2F2,stroke:#D9D9D9
    style Queue fill:#F2F2F2,stroke:#D9D9D9
    style Workers fill:#F2F2F2,stroke:#D9D9D9
    style Storage fill:#F2F2F2,stroke:#D9D9D9

Task Queue with Celery

from celery import Celery
import json

app = Celery("agent_workers", broker="redis://localhost:6379/0")
app.conf.update(
    task_serializer="json",
    result_backend="redis://localhost:6379/1",
    task_acks_late=True,           # Re-deliver on crash
    worker_prefetch_multiplier=1,  # One task at a time
    task_time_limit=120,           # Hard timeout: 2 minutes
    task_soft_time_limit=90,       # Soft timeout: 1.5 minutes
)


@app.task(bind=True, max_retries=2, default_retry_delay=5)
def run_agent_task(self, query: str, run_id: str | None = None):
    """Execute an agent run as a distributed task."""
    try:
        result = run_durable_agent(query, run_id=run_id)
        return {"status": "completed", "answer": result, "run_id": run_id}
    except Exception as exc:
        # Retry with the same run_id to resume from checkpoint
        raise self.retry(exc=exc)

Async Task Queue with asyncio

For higher concurrency without Celery overhead:

import asyncio
import redis.asyncio as aioredis


class AgentWorkerPool:
    """Lightweight async worker pool using Redis as a task queue."""

    def __init__(self, num_workers: int = 4):
        self.num_workers = num_workers
        self.redis = aioredis.Redis()
        self.queue_key = "agent:task_queue"

    async def submit(self, query: str, run_id: str) -> None:
        """Submit a task to the queue."""
        task = json.dumps({"query": query, "run_id": run_id})
        await self.redis.rpush(self.queue_key, task)

    async def worker(self, worker_id: int) -> None:
        """Worker loop: pull tasks, execute agents, save results."""
        while True:
            # Blocking pop with 5s timeout
            item = await self.redis.blpop(self.queue_key, timeout=5)
            if item is None:
                continue

            _, raw_task = item
            task = json.loads(raw_task)

            try:
                result = run_durable_agent(
                    task["query"], run_id=task["run_id"]
                )
                await self.redis.set(
                    f"agent:result:{task['run_id']}",
                    json.dumps({"status": "completed", "answer": result}),
                )
            except Exception as e:
                await self.redis.set(
                    f"agent:result:{task['run_id']}",
                    json.dumps({"status": "failed", "error": str(e)}),
                )

    async def start(self) -> None:
        """Start the worker pool."""
        workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.num_workers)
        ]
        await asyncio.gather(*workers)

Scaling Heuristics

Factor Guideline
Workers per CPU 2–4 (most time is spent waiting on LLM API I/O)
Queue depth alert Trigger auto-scaling when queue depth > 2× worker count
Task timeout 2× expected max agent runtime (e.g., 120s if max is 60s)
Retry limit 2–3 retries with exponential backoff
Heartbeat interval Every 10–30s for long-running tasks
Prefetch 1 task per worker (agents are not parallelizable)

Streaming Intermediate Results

Users should not wait 30 seconds staring at a loading spinner. Stream each reasoning step — thought, action, observation — as it happens.

Server-Sent Events (SSE) Pattern

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()


async def stream_agent_steps(query: str, run_id: str):
    """Generator that yields agent steps as SSE events."""
    checkpoint = AgentCheckpoint(
        run_id=run_id, step=0,
        messages=[
            {"role": "system", "content": "You are a helpful retrieval agent."},
            {"role": "user", "content": query},
        ],
        tool_results=[],
    )

    for step in range(10):
        # Send "thinking" event
        yield f"data: {json.dumps({'type': 'thinking', 'step': step})}\n\n"

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=checkpoint.messages,
            tools=TOOL_SCHEMAS,
            tool_choice="auto",
            temperature=0,
            stream=True,
        )

        # Stream LLM tokens
        full_content = ""
        for chunk in response:
            delta = chunk.choices[0].delta
            if delta.content:
                full_content += delta.content
                yield f"data: {json.dumps({'type': 'token', 'content': delta.content})}\n\n"

            if delta.tool_calls:
                for tc in delta.tool_calls:
                    yield f"data: {json.dumps({'type': 'tool_call', 'tool': tc.function.name if tc.function else None})}\n\n"

        # Check for final answer vs tool calls
        if full_content and not chunk.choices[0].finish_reason == "tool_calls":
            yield f"data: {json.dumps({'type': 'answer', 'content': full_content})}\n\n"
            yield "data: [DONE]\n\n"
            return

        # Execute tools and stream observations
        for tool_call in pending_tool_calls:
            yield f"data: {json.dumps({'type': 'tool_start', 'tool': tool_call.function.name})}\n\n"
            result = execute_tool(tool_call)
            yield f"data: {json.dumps({'type': 'observation', 'tool': tool_call.function.name, 'result': str(result)[:500]})}\n\n"

            checkpoint.messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": str(result),
            })

    yield "data: [DONE]\n\n"


@app.get("/agent/stream")
async def agent_stream(query: str):
    run_id = str(uuid.uuid4())
    return StreamingResponse(
        stream_agent_steps(query, run_id),
        media_type="text/event-stream",
    )

LangGraph Streaming

LangGraph provides built-in streaming with event types for each graph node:

from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
agent = create_react_agent(model=llm, tools=tools)


async def stream_langgraph_agent(query: str):
    """Stream LangGraph agent execution step by step."""
    async for event in agent.astream_events(
        {"messages": [{"role": "user", "content": query}]},
        version="v2",
    ):
        kind = event["event"]

        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                yield {
                    "type": "token",
                    "content": content,
                    "node": event.get("metadata", {}).get("langgraph_node"),
                }

        elif kind == "on_tool_start":
            yield {
                "type": "tool_start",
                "tool": event["name"],
                "input": event["data"].get("input"),
            }

        elif kind == "on_tool_end":
            yield {
                "type": "tool_end",
                "tool": event["name"],
                "output": str(event["data"].output)[:500],
            }

Double-Texting: Handling User Interruptions

What happens when a user sends a new message while the agent is still processing the previous one? LangGraph Platform supports four strategies:

Strategy Behavior Use case
Reject Return error; ignore the new input Strict sequential processing
Queue Finish current run, then process new input Ordered processing
Interrupt Cancel current run, start new one from latest state Conversational UX
Rollback Cancel current run, revert to state before it started, process new input Clean restart
# LangGraph Platform handles double-texting via API configuration
# When self-hosting, implement Interrupt strategy:

class InterruptibleAgent:
    """Agent that supports cancellation for double-texting."""

    def __init__(self):
        self._cancel_event = asyncio.Event()

    def cancel(self) -> None:
        self._cancel_event.set()

    async def run_step(self, checkpoint):
        """Run a single agent step, checking for cancellation."""
        if self._cancel_event.is_set():
            raise asyncio.CancelledError("Interrupted by new user message")

        response = await aclient.chat.completions.create(
            model="gpt-4o-mini",
            messages=checkpoint.messages,
            tools=TOOL_SCHEMAS,
            tool_choice="auto",
        )
        return response

Cost Monitoring and Token Budgeting

LLM API calls are the dominant cost in agent deployments. A single agent run can consume 10,000–100,000 tokens across multiple steps. Without monitoring, costs spiral out of control.

Token Budget Enforcement

from dataclasses import dataclass


@dataclass
class TokenBudget:
    """Per-run token budget with real-time tracking."""
    max_input_tokens: int = 50_000
    max_output_tokens: int = 10_000
    max_total_cost_usd: float = 0.50
    max_steps: int = 15

    # Tracking
    used_input_tokens: int = 0
    used_output_tokens: int = 0
    used_cost_usd: float = 0.0
    steps_taken: int = 0

    # Pricing (per 1M tokens)
    INPUT_PRICE = {"gpt-4o-mini": 0.15, "gpt-4o": 2.50}
    OUTPUT_PRICE = {"gpt-4o-mini": 0.60, "gpt-4o": 10.00}

    def record_usage(self, model: str, input_tokens: int, output_tokens: int):
        self.used_input_tokens += input_tokens
        self.used_output_tokens += output_tokens
        self.used_cost_usd += (
            input_tokens * self.INPUT_PRICE.get(model, 1.0) / 1_000_000
            + output_tokens * self.OUTPUT_PRICE.get(model, 4.0) / 1_000_000
        )
        self.steps_taken += 1

    def check(self) -> str | None:
        """Return a violation reason, or None if within budget."""
        if self.used_input_tokens > self.max_input_tokens:
            return f"Input token limit exceeded ({self.used_input_tokens}/{self.max_input_tokens})"
        if self.used_output_tokens > self.max_output_tokens:
            return f"Output token limit exceeded ({self.used_output_tokens}/{self.max_output_tokens})"
        if self.used_cost_usd > self.max_total_cost_usd:
            return f"Cost limit exceeded (${self.used_cost_usd:.4f}/${self.max_total_cost_usd})"
        if self.steps_taken > self.max_steps:
            return f"Step limit exceeded ({self.steps_taken}/{self.max_steps})"
        return None


def run_budget_aware_agent(query: str, budget: TokenBudget) -> dict:
    """Agent loop with per-run token budget enforcement."""
    messages = [
        {"role": "system", "content": "You are a helpful retrieval agent."},
        {"role": "user", "content": query},
    ]
    model = "gpt-4o-mini"

    for step in range(budget.max_steps):
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            tools=TOOL_SCHEMAS,
            tool_choice="auto",
            temperature=0,
        )

        usage = response.usage
        budget.record_usage(model, usage.prompt_tokens, usage.completion_tokens)

        # Check budget after every LLM call
        violation = budget.check()
        if violation:
            return {
                "status": "budget_exceeded",
                "reason": violation,
                "partial_answer": messages[-1].get("content", ""),
                "cost_usd": budget.used_cost_usd,
                "total_tokens": budget.used_input_tokens + budget.used_output_tokens,
            }

        msg = response.choices[0].message
        messages.append(msg.model_dump())

        if not msg.tool_calls:
            return {
                "status": "completed",
                "answer": msg.content,
                "cost_usd": budget.used_cost_usd,
                "total_tokens": budget.used_input_tokens + budget.used_output_tokens,
                "steps": budget.steps_taken,
            }

        for tool_call in msg.tool_calls:
            result = execute_tool(tool_call)
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": str(result),
            })

    return {"status": "max_steps", "cost_usd": budget.used_cost_usd}

OpenTelemetry Instrumentation

OpenTelemetry’s GenAI semantic conventions define standard metrics for LLM operations. Key metrics for cost monitoring:

Metric Type What it tracks
gen_ai.client.token.usage Histogram Input and output token counts per call
gen_ai.client.operation.duration Histogram Latency per LLM call in seconds
gen_ai.server.time_to_first_token Histogram Time to first streaming token
gen_ai.server.time_per_output_token Histogram Decode speed (tokens/sec)
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader

# Setup
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter())
)
reader = PeriodicExportingMetricReader(OTLPMetricExporter())
metrics.set_meter_provider(MeterProvider(metric_readers=[reader]))

tracer = trace.get_tracer("agent.service")
meter = metrics.get_meter("agent.service")

# Define metrics following OTel GenAI conventions
token_usage = meter.create_histogram(
    "gen_ai.client.token.usage",
    unit="{token}",
    description="Number of input and output tokens used",
)
operation_duration = meter.create_histogram(
    "gen_ai.client.operation.duration",
    unit="s",
    description="GenAI operation duration",
)
agent_run_cost = meter.create_histogram(
    "agent.run.cost",
    unit="USD",
    description="Total cost per agent run",
)
agent_run_steps = meter.create_histogram(
    "agent.run.steps",
    unit="{step}",
    description="Number of steps per agent run",
)


def instrumented_llm_call(messages, model, tools=None):
    """LLM call with OpenTelemetry tracing and metrics."""
    import time

    with tracer.start_as_current_span(
        f"chat {model}",
        attributes={
            "gen_ai.operation.name": "chat",
            "gen_ai.request.model": model,
            "gen_ai.provider.name": "openai",
        },
    ) as span:
        start = time.perf_counter()

        response = client.chat.completions.create(
            model=model,
            messages=messages,
            tools=tools,
            tool_choice="auto" if tools else None,
            temperature=0,
        )

        duration = time.perf_counter() - start

        # Record span attributes
        usage = response.usage
        span.set_attribute("gen_ai.response.model", response.model)
        span.set_attribute("gen_ai.usage.input_tokens", usage.prompt_tokens)
        span.set_attribute("gen_ai.usage.output_tokens", usage.completion_tokens)

        # Record metrics
        attrs = {
            "gen_ai.operation.name": "chat",
            "gen_ai.request.model": model,
            "gen_ai.provider.name": "openai",
        }
        token_usage.record(
            usage.prompt_tokens,
            {**attrs, "gen_ai.token.type": "input"},
        )
        token_usage.record(
            usage.completion_tokens,
            {**attrs, "gen_ai.token.type": "output"},
        )
        operation_duration.record(duration, attrs)

        return response

Cost Dashboard Queries

With OpenTelemetry data flowing to your observability backend, build dashboards to answer:

# Prometheus/PromQL examples

# Total tokens consumed in the last hour
sum(increase(gen_ai_client_token_usage_sum[1h]))

# Average cost per agent run (using custom metric)
histogram_quantile(0.5, rate(agent_run_cost_bucket[1h]))

# P95 latency per LLM call
histogram_quantile(0.95, rate(gen_ai_client_operation_duration_bucket[1h]))

# Token usage by model
sum by (gen_ai_request_model) (
    increase(gen_ai_client_token_usage_sum[1h])
)

# Runs exceeding budget (custom counter)
increase(agent_budget_violations_total[1h])

Deploying with LangGraph Platform

LangGraph Platform (formerly LangGraph Cloud) provides a managed infrastructure for deploying LangGraph agents at scale. It handles task queues, Postgres checkpointing, streaming, double-texting, and monitoring out of the box.

Architecture

graph TD
    subgraph Platform["LangGraph Platform"]
        API["HTTP/WS API"]
        TQ["Task Queue"]
        PG["Postgres Checkpointer"]
        W["Auto-scaled Workers"]
        LS["LangSmith Integration"]
    end

    U["Users"] --> API
    API --> TQ
    TQ --> W
    W --> PG
    W --> LS

    style Platform fill:#F2F2F2,stroke:#D9D9D9
    style U fill:#4a90d9,color:#fff,stroke:#333
    style API fill:#9b59b6,color:#fff,stroke:#333
    style TQ fill:#e67e22,color:#fff,stroke:#333
    style PG fill:#27ae60,color:#fff,stroke:#333
    style W fill:#1abc9c,color:#fff,stroke:#333
    style LS fill:#f5a623,color:#fff,stroke:#333

Key Features

Feature Description
Durable execution Postgres-backed checkpointer persists state after every node
Horizontal scaling Auto-scaled task queues distribute runs across workers
Streaming Built-in SSE streaming of tokens, tool calls, and node transitions
Double-texting Four strategies: reject, queue, interrupt, rollback
Background jobs Async runs with polling or webhook notification
Cron jobs Scheduled agent runs (e.g., daily report generation)
LangSmith tracing Automatic tracing, monitoring, and cost tracking

Self-Hosted Deployment

For teams that cannot use a managed service, LangGraph Platform can be self-hosted:

# langgraph.json — deployment configuration
{
    "graphs": {
        "retrieval_agent": "./agent.py:graph"
    },
    "dependencies": ["langchain-openai", "langchain-community"],
    "env": {
        "OPENAI_API_KEY": "",
        "LANGSMITH_API_KEY": ""
    }
}
# agent.py — the graph definition
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI


class State(TypedDict):
    messages: Annotated[list, add_messages]


llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)


def agent(state: State) -> dict:
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}


def should_continue(state: State) -> str:
    last = state["messages"][-1]
    if hasattr(last, "tool_calls") and last.tool_calls:
        return "tools"
    return END


builder = StateGraph(State)
builder.add_node("agent", agent)
builder.add_node("tools", ToolNode(tools))
builder.set_entry_point("agent")
builder.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})
builder.add_edge("tools", "agent")

# Compile with Postgres checkpointer for durable execution
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost:5432/langgraph"
)
graph = builder.compile(checkpointer=checkpointer)

Deploying with Temporal

Temporal is a general-purpose durable execution platform. Unlike LangGraph Platform (which is agent-specific), Temporal handles any long-running workflow — making it ideal when agents are one component of a larger business process.

Modeling an Agent as a Temporal Workflow

from datetime import timedelta
from dataclasses import dataclass
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker


@dataclass
class AgentInput:
    query: str
    model: str = "gpt-4o-mini"
    max_steps: int = 10


@dataclass
class LLMCallInput:
    messages: list
    model: str
    tools: list | None = None


@dataclass
class ToolCallInput:
    tool_name: str
    tool_args: dict


# Activities: non-deterministic operations (LLM calls, tool execution)
@activity.defn
async def call_llm(input: LLMCallInput) -> dict:
    """Call the LLM API. This is an Activity because it has side effects."""
    response = client.chat.completions.create(
        model=input.model,
        messages=input.messages,
        tools=input.tools,
        tool_choice="auto" if input.tools else None,
        temperature=0,
    )
    return {
        "message": response.choices[0].message.model_dump(),
        "usage": {
            "input_tokens": response.usage.prompt_tokens,
            "output_tokens": response.usage.completion_tokens,
        },
    }


@activity.defn
async def execute_tool_activity(input: ToolCallInput) -> str:
    """Execute a tool. Activity because it may call external APIs."""
    tool_fn = TOOLS.get(input.tool_name)
    if not tool_fn:
        return f"Unknown tool: {input.tool_name}"
    return str(tool_fn(**input.tool_args))


# Workflow: the deterministic agent loop
@workflow.defn(name="RetrievalAgent")
class RetrievalAgentWorkflow:
    @workflow.run
    async def run(self, input: AgentInput) -> dict:
        messages = [
            {"role": "system", "content": "You are a helpful retrieval agent."},
            {"role": "user", "content": input.query},
        ]
        total_tokens = 0

        for step in range(input.max_steps):
            # Call LLM (as an Activity — survives crashes)
            result = await workflow.execute_activity(
                call_llm,
                LLMCallInput(
                    messages=messages,
                    model=input.model,
                    tools=TOOL_SCHEMAS,
                ),
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )

            msg = result["message"]
            messages.append(msg)
            total_tokens += (
                result["usage"]["input_tokens"]
                + result["usage"]["output_tokens"]
            )

            # No tool calls → done
            tool_calls = msg.get("tool_calls", [])
            if not tool_calls:
                return {
                    "answer": msg.get("content", ""),
                    "total_tokens": total_tokens,
                    "steps": step + 1,
                }

            # Execute each tool call as an Activity
            for tc in tool_calls:
                tool_result = await workflow.execute_activity(
                    execute_tool_activity,
                    ToolCallInput(
                        tool_name=tc["function"]["name"],
                        tool_args=json.loads(tc["function"]["arguments"]),
                    ),
                    start_to_close_timeout=timedelta(seconds=30),
                )
                messages.append({
                    "role": "tool",
                    "tool_call_id": tc["id"],
                    "content": tool_result,
                })

        return {"answer": "Max steps reached", "total_tokens": total_tokens}


# Worker setup
async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
        client,
        task_queue="agent-tasks",
        workflows=[RetrievalAgentWorkflow],
        activities=[call_llm, execute_tool_activity],
    )
    await worker.run()

Temporal vs. LangGraph Platform

Aspect Temporal LangGraph Platform
Purpose General-purpose durable execution Agent-specific deployment
Crash recovery Automatic via event sourcing + replay Postgres checkpointing
Streaming Requires custom implementation (queries/signals) Built-in SSE streaming
Scaling Worker pools with task queues Managed auto-scaling
Double-texting Custom via signals/cancellation Built-in (4 strategies)
Agent frameworks Framework-agnostic LangGraph-specific
Observability Temporal UI + custom metrics LangSmith integration
Self-hosting Docker Compose or Kubernetes Docker or managed cloud
Best for Agents embedded in larger business workflows Standalone agent deployments

Production Deployment Checklist

Before deploying a retrieval agent to production, verify each item:

Infrastructure

Reliability

Cost Controls

Observability

User Experience

Conclusion

Deploying retrieval agents in production requires infrastructure that traditional web services do not need. The core challenges — long-running stateful execution, unpredictable costs, multi-step failure modes, and real-time user feedback — each require purpose-built solutions.

Key takeaways:

  • Durable execution is non-negotiable — Agents take 10–120 seconds across multiple steps. Without checkpointing, any infrastructure failure restarts the entire run, wasting tokens and time. Use step-level checkpointing (simplest), event sourcing (for audit trails), or Temporal (for complex business workflows).
  • Scale with task queues, not threads — Agent workers are I/O bound (waiting on LLM APIs). Distribute work via Redis, RabbitMQ, or managed task queues. Set prefetch to 1 (one task per worker) and auto-scale on queue depth.
  • Stream everything — Users need real-time feedback during long runs. Stream thoughts, tool calls, and observations via Server-Sent Events. Handle double-texting with interrupt or queue strategies.
  • Budget every run — Token costs are the primary operational expense. Enforce per-run limits on input tokens, output tokens, total cost, and step count. Use OpenTelemetry’s GenAI semantic conventions (gen_ai.client.token.usage, gen_ai.client.operation.duration) for standard metrics across providers.
  • LangGraph Platform provides agent-specific infrastructure (checkpointing, streaming, double-texting, LangSmith tracing) as a managed service. Temporal provides general-purpose durable execution for agents that are part of larger business workflows.
  • Observability is the foundation — You cannot control what you cannot see. Instrument every LLM call with distributed tracing, record token counts as metrics, set alerts on cost anomalies, and log every step with a run_id for end-to-end debugging.

Start with the simplest infrastructure that covers your scale — a single worker with Redis checkpointing, SSE streaming, and a token budget. Add task queues when you need concurrency, Temporal when you need business workflow orchestration, and managed platforms when you need operational simplicity.

References

  1. LangChain, “Announcing LangGraph v0.1 & LangGraph Cloud: Running Agents at Scale, Reliably,” blog.langchain.com, Jun. 2024. Available: https://blog.langchain.com/langgraph-cloud/
  2. Temporal Technologies, “Core Application — Python SDK,” docs.temporal.io, 2024. Available: https://docs.temporal.io/develop/python/core-application
  3. OpenTelemetry, “Semantic Conventions for Generative AI Systems,” opentelemetry.io, 2025. Available: https://opentelemetry.io/docs/specs/semconv/gen-ai/
  4. T. R. Sumers, S. Yao, K. Narasimhan, and T. L. Griffiths, “Cognitive Architectures for Language Agents,” TMLR, arXiv:2309.02427, 2024. Available: https://arxiv.org/abs/2309.02427
  5. Z. Xi et al., “The Rise and Potential of Large Language Model Based Agents: A Survey,” arXiv:2309.07864, 2023. Available: https://arxiv.org/abs/2309.07864
  6. LangChain, “LangSmith Observability,” docs.langchain.com, 2024. Available: https://docs.langchain.com/langsmith/observability
  7. S. Yao et al., “ReAct: Synergizing Reasoning and Acting in Language Models,” ICLR 2023, arXiv:2210.03629. Available: https://arxiv.org/abs/2210.03629

Read More

  • Understand the ReAct agent loop that these deployment patterns protect — the Thought-Action-Observation cycle, stopping conditions, and error recovery.
  • Wrap your tools for production with Tool Use and Function Calling — schema validation, error handling, and dynamic tool selection.
  • Deploy LangGraph agents with checkpointers, streaming, and human-in-the-loop — the framework-level patterns that LangGraph Platform automates.
  • Scale multi-agent orchestration to production — supervisor routing, parallel fan-out, and coordinator patterns across worker pools.
  • Persist agent memory across sessions — checkpoint memory alongside execution state for durable long-running agents.
  • Add checkpointing to planning and query decomposition — multi-step plans need step-level recovery to avoid re-executing completed sub-queries.
  • Deploy deep research agents with budget controls — open-ended investigation requires strict token budgets and cost monitoring.
  • Enforce guardrails and safety in production — rate limits, authorization gates, and input validation as production-critical safety layers.
  • Monitor deployed agents with evaluation and debugging — trajectory scoring, LangSmith traces, and automated failure classification.
  • Track production behavior with Observability for Multi-Turn LLM Conversations — the observability foundation that makes cost monitoring and debugging possible.
  • Apply FinOps Best Practices for LLM Applications — organizational patterns for managing LLM spend at scale.